package com.homeWorkMapReduce;

import com.shop.Step8;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;
import java.net.URI;

public class GrandchildRelationship extends Configured implements Tool {

    static class RelationShipMap extends Mapper<DoubleWritable, Text, Text, Text> {
        @Override
        protected void map(DoubleWritable key, Text value, Mapper<DoubleWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException {
            String[] split = value.toString().split(" ");
            context.write(new Text(split[0]), new Text(split[1]));
        }
    }

    static class RelationshipReducer extends Reducer<Text, Text, Text, Text> {
        private Text result = new Text();

        @Override
        protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException {
            for (Text val : values) {
                result.set(val);
                context.write(key, result);
            }
        }
    }

    @Override
    public int run(String[] strings) throws Exception {
        Configuration conf = getConf();
        // 指定输入输出路径
        Path input = new Path(
                "hdfs://192.168.10.11:9000/relation");
        Path output = new Path(
                "hdfs://192.168.10.11:9000/cuizongqi");
        FileSystem fs = FileSystem.get(
                new URI("hdfs://192.168.10.11:9000")
                , conf);
        if (fs.exists(output)) fs.delete(output, true);
        //构建Job
        Job job = Job.getInstance(conf);
        job.setJobName("cuiozngqi");
        job.setJarByClass(this.getClass());

        job.setMapperClass(RelationShipMap.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        job.setReducerClass(RelationshipReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);


        job.setOutputFormatClass(TextOutputFormat.class);
        TextOutputFormat.setOutputPath(job, output);
//        job.setOutputFormatClass(SequenceFileOutputFormat.class);
//        SequenceFileOutputFormat.setOutputPath(job, step8output);
        return job.waitForCompletion(true) ? 0 : -1;
    }

    public static void main(String[] args) throws Exception {
        System.exit(ToolRunner.run(new GrandchildRelationship(), args));
    }
}
